Simplify reply logic in xenstored. Maintain a linked list
authorkaf24@firebug.cl.cam.ac.uk <kaf24@firebug.cl.cam.ac.uk>
Sun, 9 Oct 2005 22:53:03 +0000 (23:53 +0100)
committerkaf24@firebug.cl.cam.ac.uk <kaf24@firebug.cl.cam.ac.uk>
Sun, 9 Oct 2005 22:53:03 +0000 (23:53 +0100)
of pending replies that are sent out in order.

Currently we only read new requests when the reply list is
empty. In fact there is no good reason for this restriction.
Another interesting point is that (on my test machine)
hotplug blk setup fails if xenstored_client connects to
xenstored via the unix domain socket rather than through the
kernel --- this points to some user/kernel races that are
'fixed' by the extra serialisation of the in-kernel mutexes.
It definitely needs looking into.

Signed-off-by: Keir Fraser <keir@xensource.com>
tools/xenstore/xenstored_core.c
tools/xenstore/xenstored_core.h
tools/xenstore/xenstored_domain.c
tools/xenstore/xenstored_watch.c
tools/xenstore/xenstored_watch.h
xen/include/public/io/xs_wire.h

index 25c9b03bb80d841c31538818a4aa0c0397aa577a..4c998bea70b6c8a54e6467049a8205e1c7a673a6 100644 (file)
@@ -235,52 +235,50 @@ void trace(const char *fmt, ...)
        talloc_free(str);
 }
 
-static bool write_message(struct connection *conn)
+static bool write_messages(struct connection *conn)
 {
        int ret;
-       struct buffered_data *out = conn->out;
-
-       if (out->inhdr) {
-               if (verbose)
-                       xprintf("Writing msg %s (%s) out to %p\n",
-                               sockmsg_string(out->hdr.msg.type),
-                               out->buffer, conn);
-               ret = conn->write(conn, out->hdr.raw + out->used,
-                                 sizeof(out->hdr) - out->used);
+       struct buffered_data *out, *tmp;
+
+       list_for_each_entry_safe(out, tmp, &conn->out_list, list) {
+               if (out->inhdr) {
+                       if (verbose)
+                               xprintf("Writing msg %s (%s) out to %p\n",
+                                       sockmsg_string(out->hdr.msg.type),
+                                       out->buffer, conn);
+                       ret = conn->write(conn, out->hdr.raw + out->used,
+                                         sizeof(out->hdr) - out->used);
+                       if (ret < 0)
+                               return false;
+
+                       out->used += ret;
+                       if (out->used < sizeof(out->hdr))
+                               return true;
+
+                       out->inhdr = false;
+                       out->used = 0;
+
+                       /* Second write might block if non-zero. */
+                       if (out->hdr.msg.len && !conn->domain)
+                               return true;
+               }
+
+               ret = conn->write(conn, out->buffer + out->used,
+                                 out->hdr.msg.len - out->used);
+
                if (ret < 0)
                        return false;
 
                out->used += ret;
-               if (out->used < sizeof(out->hdr))
+               if (out->used != out->hdr.msg.len)
                        return true;
 
-               out->inhdr = false;
-               out->used = 0;
+               trace_io(conn, "OUT", out);
 
-               /* Second write might block if non-zero. */
-               if (out->hdr.msg.len && !conn->domain)
-                       return true;
+               list_del(&out->list);
+               talloc_free(out);
        }
 
-       ret = conn->write(conn, out->buffer + out->used,
-                         out->hdr.msg.len - out->used);
-
-       if (ret < 0)
-               return false;
-
-       out->used += ret;
-       if (out->used != out->hdr.msg.len)
-               return true;
-
-       trace_io(conn, "OUT", out);
-       conn->out = NULL;
-       talloc_free(out);
-
-       queue_next_event(conn);
-
-       /* No longer busy? */
-       if (!conn->out)
-               conn->state = OK;
        return true;
 }
 
@@ -297,9 +295,9 @@ static int destroy_conn(void *_conn)
                FD_SET(conn->fd, &set);
                none.tv_sec = none.tv_usec = 0;
 
-               while (conn->out
+               while (!list_empty(&conn->out_list)
                       && select(conn->fd+1, NULL, &set, NULL, &none) == 1)
-                       if (!write_message(conn))
+                       if (!write_messages(conn))
                                break;
                close(conn->fd);
        }
@@ -326,9 +324,9 @@ static int initialize_set(fd_set *inset, fd_set *outset, int sock, int ro_sock)
        list_for_each_entry(i, &connections, list) {
                if (i->domain)
                        continue;
-               if (i->state == OK)
+               if (list_empty(&i->out_list))
                        FD_SET(i->fd, inset);
-               if (i->out)
+               if (!list_empty(&i->out_list))
                        FD_SET(i->fd, outset);
                if (i->fd > max)
                        max = i->fd;
@@ -594,14 +592,7 @@ void send_reply(struct connection *conn, enum xsd_sockmsg_type type,
        bdata->hdr.msg.len = len;
        memcpy(bdata->buffer, data, len);
 
-       /* There might be an event going out now.  Queue behind it. */
-       if (conn->out) {
-               assert(conn->out->hdr.msg.type == XS_WATCH_EVENT);
-               assert(!conn->waiting_reply);
-               conn->waiting_reply = bdata;
-       } else
-               conn->out = bdata;
-       conn->state = BUSY;
+       list_add_tail(&bdata->list, &conn->out_list);
 }
 
 /* Some routines (write, mkdir, etc) just need a non-error return */
@@ -1148,8 +1139,6 @@ static void consider_message(struct connection *conn)
        enum xsd_sockmsg_type volatile type = conn->in->hdr.msg.type;
        jmp_buf talloc_fail;
 
-       assert(conn->state == OK);
-
        /* For simplicity, we kill the connection on OOM. */
        talloc_set_fail_handler(out_of_mem, &talloc_fail);
        if (setjmp(talloc_fail)) {
@@ -1186,10 +1175,7 @@ end:
 static void handle_input(struct connection *conn)
 {
        int bytes;
-       struct buffered_data *in;
-
-       assert(conn->state == OK);
-       in = conn->in;
+       struct buffered_data *in = conn->in;
 
        /* Not finished header yet? */
        if (in->inhdr) {
@@ -1237,7 +1223,7 @@ bad_client:
 
 static void handle_output(struct connection *conn)
 {
-       if (!write_message(conn))
+       if (!write_messages(conn))
                talloc_free(conn);
 }
 
@@ -1254,8 +1240,6 @@ struct connection *new_connection(connwritefn_t *write, connreadfn_t *read)
        if (!new)
                return NULL;
 
-       new->state = OK;
-       new->out = new->waiting_reply = NULL;
        new->fd = -1;
        new->id = 0;
        new->domain = NULL;
@@ -1263,6 +1247,7 @@ struct connection *new_connection(connwritefn_t *write, connreadfn_t *read)
        new->write = write;
        new->read = read;
        new->can_write = true;
+       INIT_LIST_HEAD(&new->out_list);
        INIT_LIST_HEAD(&new->watches);
 
        talloc_set_fail_handler(out_of_mem, &talloc_fail);
@@ -1317,23 +1302,17 @@ void dump_connection(void)
        list_for_each_entry(i, &connections, list) {
                printf("Connection %p:\n", i);
                printf("    state = %s\n",
-                      i->state == OK ? "OK"
-                      : i->state == BUSY ? "BUSY"
-                      : "INVALID");
+                      list_empty(&i->out_list) ? "OK" : "BUSY");
                if (i->id)
                        printf("    id = %i\n", i->id);
                if (!i->in->inhdr || i->in->used)
                        printf("    got %i bytes of %s\n",
                               i->in->used, i->in->inhdr ? "header" : "data");
+#if 0
                if (i->out)
                        printf("    sending message %s (%s) out\n",
                               sockmsg_string(i->out->hdr.msg.type),
                               i->out->buffer);
-               if (i->waiting_reply)
-                       printf("    ... and behind is queued %s (%s)\n",
-                              sockmsg_string(i->waiting_reply->hdr.msg.type),
-                              i->waiting_reply->buffer);
-#if 0
                if (i->transaction)
                        dump_transaction(i);
                if (i->domain)
@@ -1604,3 +1583,13 @@ int main(int argc, char *argv[])
                max = initialize_set(&inset, &outset, *sock, *ro_sock);
        }
 }
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
index e017bddf429ecc7885bbc5056f52bd1465c504a9..c50936f1bc6cb83b646742e4684a6bb15dacaed2 100644 (file)
 
 struct buffered_data
 {
+       struct list_head list;
+
        /* Are we still doing the header? */
        bool inhdr;
+
        /* How far are we? */
        unsigned int used;
+
        union {
                struct xsd_sockmsg msg;
                char raw[sizeof(struct xsd_sockmsg)];
        } hdr;
+
        /* The actual data. */
        char *buffer;
 };
@@ -47,14 +52,6 @@ struct connection;
 typedef int connwritefn_t(struct connection *, const void *, unsigned int);
 typedef int connreadfn_t(struct connection *, void *, unsigned int);
 
-enum state
-{
-       /* Doing action, not listening */
-       BUSY,
-       /* Completed */
-       OK,
-};
-
 struct connection
 {
        struct list_head list;
@@ -62,12 +59,9 @@ struct connection
        /* The file descriptor we came in on. */
        int fd;
 
-       /* Who am I?  0 for socket connections. */
+       /* Who am I? 0 for socket connections. */
        domid_t id;
 
-       /* Blocked on transaction?  Busy? */
-       enum state state;
-
        /* Is this a read-only connection? */
        bool can_write;
 
@@ -75,10 +69,7 @@ struct connection
        struct buffered_data *in;
 
        /* Buffered output data */
-       struct buffered_data *out;
-
-       /* If we had a watch fire outgoing when we needed to reply... */
-       struct buffered_data *waiting_reply;
+       struct list_head out_list;
 
        /* My transaction, if any. */
        struct transaction *transaction;
@@ -172,3 +163,13 @@ void trace(const char *fmt, ...);
 extern int event_fd;
 
 #endif /* _XENSTORED_CORE_H */
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
index a7cd4493ad73d6e2d6420e02beb34c607f3bf9a1..15c98bfb493d4771b93210560f3e2f8679b04833 100644 (file)
@@ -276,12 +276,14 @@ void handle_event(void)
 
 bool domain_can_read(struct connection *conn)
 {
-       return conn->state == OK && buffer_has_input(conn->domain->input);
+       return (list_empty(&conn->out_list) &&
+                buffer_has_input(conn->domain->input));
 }
 
 bool domain_can_write(struct connection *conn)
 {
-       return conn->out && buffer_has_output_room(conn->domain->output);
+       return (!list_empty(&conn->out_list) &&
+                buffer_has_output_room(conn->domain->output));
 }
 
 static struct domain *new_domain(void *context, domid_t domid,
index dfb0cd1a3e9ca9fb3f9a4574870e2a8e151a1321..6f3c2e4e03b044234eee5e1286ca2ef4b7fbc1a5 100644 (file)
 #include "xenstored_test.h"
 #include "xenstored_domain.h"
 
-/* FIXME: time out unacked watches. */
-struct watch_event
-{
-       /* The events on this watch. */
-       struct list_head list;
-
-       /* Data to send (node\0token\0). */
-       unsigned int len;
-       char *data;
-};
-
 struct watch
 {
        /* Watches on this connection */
@@ -58,50 +47,17 @@ struct watch
        char *node;
 };
 
-/* Look through our watches: if any of them have an event, queue it. */
-void queue_next_event(struct connection *conn)
-{
-       struct watch_event *event;
-       struct watch *watch;
-
-       /* We had a reply queued already?  Send it: other end will
-        * discard watch. */
-       if (conn->waiting_reply) {
-               conn->out = conn->waiting_reply;
-               conn->waiting_reply = NULL;
-               return;
-       }
-
-       list_for_each_entry(watch, &conn->watches, list) {
-               event = list_top(&watch->events, struct watch_event, list);
-               if (event) {
-                       list_del(&event->list);
-                       talloc_free(event);
-                       send_reply(conn,XS_WATCH_EVENT,event->data,event->len);
-                       break;
-               }
-       }
-}
-
-static int destroy_watch_event(void *_event)
-{
-       struct watch_event *event = _event;
-
-       trace_destroy(event, "watch_event");
-       return 0;
-}
-
 static void add_event(struct connection *conn,
                      struct watch *watch,
                      const char *name)
 {
-       struct watch_event *event;
+       /* Data to send (node\0token\0). */
+       unsigned int len;
+       char *data;
 
        if (!check_event_node(name)) {
                /* Can this conn load node, or see that it doesn't exist? */
-               struct node *node;
-
-               node = get_node(conn, name, XS_PERM_READ);
+               struct node *node = get_node(conn, name, XS_PERM_READ);
                if (!node && errno != ENOENT)
                        return;
        }
@@ -112,14 +68,12 @@ static void add_event(struct connection *conn,
                        name++;
        }
 
-       event = talloc(watch, struct watch_event);
-       event->len = strlen(name) + 1 + strlen(watch->token) + 1;
-       event->data = talloc_array(event, char, event->len);
-       strcpy(event->data, name);
-       strcpy(event->data + strlen(name) + 1, watch->token);
-       talloc_set_destructor(event, destroy_watch_event);
-       list_add_tail(&event->list, &watch->events);
-       trace_create(event, "watch_event");
+       len = strlen(name) + 1 + strlen(watch->token) + 1;
+       data = talloc_array(watch, char, len);
+       strcpy(data, name);
+       strcpy(data + strlen(name) + 1, watch->token);
+        send_reply(conn, XS_WATCH_EVENT, data, len);
+       talloc_free(data);
 }
 
 /* FIXME: we fail to fire on out of memory.  Should drop connections. */
@@ -139,11 +93,6 @@ void fire_watches(struct connection *conn, const char *name, bool recurse)
                                add_event(i, watch, name);
                        else if (recurse && is_child(watch->node, name))
                                add_event(i, watch, watch->node);
-                       else
-                               continue;
-                       /* If connection not doing anything, queue this. */
-                       if (i->state == OK)
-                               queue_next_event(i);
                }
        }
 }
@@ -231,13 +180,19 @@ void do_unwatch(struct connection *conn, struct buffered_data *in)
 void dump_watches(struct connection *conn)
 {
        struct watch *watch;
-       struct watch_event *event;
 
-       list_for_each_entry(watch, &conn->watches, list) {
+       list_for_each_entry(watch, &conn->watches, list)
                printf("    watch on %s token %s\n",
                       watch->node, watch->token);
-               list_for_each_entry(event, &watch->events, list)
-                       printf("        event: %s\n", event->data);
-       }
 }
 #endif
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
index ed8892c4aa6070dca7a26f9c6d6e72db0fda8770..2eccff94766bb265483298bd30555cf9ca03b002 100644 (file)
 #include "xenstored_core.h"
 
 void do_watch(struct connection *conn, struct buffered_data *in);
-void do_watch_ack(struct connection *conn, const char *token);
 void do_unwatch(struct connection *conn, struct buffered_data *in);
 
-/* Is this a watch event message for this connection? */
-bool is_watch_event(struct connection *conn, struct buffered_data *out);
-
-/* Look through our watches: if any of them have an event, queue it. */
-void queue_next_event(struct connection *conn);
-
-/* Fire all watches: recurse means all the children are affected (ie. rm).
- */
+/* Fire all watches: recurse means all the children are affected (ie. rm). */
 void fire_watches(struct connection *conn, const char *name, bool recurse);
 
 void dump_watches(struct connection *conn);
index 3c26cc0d8027fe41344e4689e6e82a219a4d6d4b..4ab5633b8a23fff13735a9db283747d43195a468 100644 (file)
 
 enum xsd_sockmsg_type
 {
-       XS_DEBUG,
-       XS_DIRECTORY,
-       XS_READ,
-       XS_GET_PERMS,
-       XS_WATCH,
-       XS_UNWATCH,
-       XS_TRANSACTION_START,
-       XS_TRANSACTION_END,
-       XS_INTRODUCE,
-       XS_RELEASE,
-       XS_GET_DOMAIN_PATH,
-       XS_WRITE,
-       XS_MKDIR,
-       XS_RM,
-       XS_SET_PERMS,
-       XS_WATCH_EVENT,
-       XS_ERROR,
+    XS_DEBUG,
+    XS_DIRECTORY,
+    XS_READ,
+    XS_GET_PERMS,
+    XS_WATCH,
+    XS_UNWATCH,
+    XS_TRANSACTION_START,
+    XS_TRANSACTION_END,
+    XS_INTRODUCE,
+    XS_RELEASE,
+    XS_GET_DOMAIN_PATH,
+    XS_WRITE,
+    XS_MKDIR,
+    XS_RM,
+    XS_SET_PERMS,
+    XS_WATCH_EVENT,
+    XS_ERROR,
 };
 
 #define XS_WRITE_NONE "NONE"
@@ -56,38 +56,40 @@ enum xsd_sockmsg_type
 /* We hand errors as strings, for portability. */
 struct xsd_errors
 {
-       int errnum;
-       const char *errstring;
+    int errnum;
+    const char *errstring;
 };
 #define XSD_ERROR(x) { x, #x }
 static struct xsd_errors xsd_errors[] __attribute__((unused)) = {
-       XSD_ERROR(EINVAL),
-       XSD_ERROR(EACCES),
-       XSD_ERROR(EEXIST),
-       XSD_ERROR(EISDIR),
-       XSD_ERROR(ENOENT),
-       XSD_ERROR(ENOMEM),
-       XSD_ERROR(ENOSPC),
-       XSD_ERROR(EIO),
-       XSD_ERROR(ENOTEMPTY),
-       XSD_ERROR(ENOSYS),
-       XSD_ERROR(EROFS),
-       XSD_ERROR(EBUSY),
-       XSD_ERROR(EAGAIN),
-       XSD_ERROR(EISCONN),
+    XSD_ERROR(EINVAL),
+    XSD_ERROR(EACCES),
+    XSD_ERROR(EEXIST),
+    XSD_ERROR(EISDIR),
+    XSD_ERROR(ENOENT),
+    XSD_ERROR(ENOMEM),
+    XSD_ERROR(ENOSPC),
+    XSD_ERROR(EIO),
+    XSD_ERROR(ENOTEMPTY),
+    XSD_ERROR(ENOSYS),
+    XSD_ERROR(EROFS),
+    XSD_ERROR(EBUSY),
+    XSD_ERROR(EAGAIN),
+    XSD_ERROR(EISCONN),
 };
 struct xsd_sockmsg
 {
-       u32 type;
-       u32 len;                /* Length of data following this. */
+    u32 type;  /* XS_??? */
+    u32 req_id;/* Request identifier, echoed in daemon's response.  */
+    u32 tx_id; /* Transaction id (0 if not related to a transaction). */
+    u32 len;   /* Length of data following this. */
 
-       /* Generally followed by nul-terminated string(s). */
+    /* Generally followed by nul-terminated string(s). */
 };
 
 enum xs_watch_type
 {
-       XS_WATCH_PATH = 0,
-       XS_WATCH_TOKEN,
+    XS_WATCH_PATH = 0,
+    XS_WATCH_TOKEN,
 };
 
 #endif /* _XS_WIRE_H */